消息隊列 - RabbitMQ實作

陳柏仁/

2023-09-23

/
166 views
message queue rabbitmq
今天,我們將使用nodejs來建立一個小程式,模擬服務之間透過RabbitMQ傳輸。

前言

今天,我們將使用 nodejs 來建立一個小程式,模擬服務之間透過 RabbitMQ 傳輸。預計會建立一個連線(connection),開兩個通道(channel),一個為傳送者,另一個為接收者,在建立連線後,透過 rabbitMQ 每隔一秒傳送"Hello world!"字串,並且印出來。

事前準備

在實作之前,必須先下載並安裝好以下軟體:

實作

安裝

RabbitMQ 支援多種協定。我們將使用 AMQP 0-9-1 協議來連線,請先下載 npm 套件 amqplib

npm install amqplib

建立連線

引入套件,並建立連線

連線需要 connection string,預設值為amqp://localhost:5672。為了安全考量,在生產環境需加入帳號密碼,此時 connection string 格式為: amqp://${username}:${password}@${host},我用程式判斷並提取變數出來當環境變數,避免程式出現密碼明碼,程式碼如下:

const amqp = require("amqplib")
;(async () => {
  const username = process.env.USER_NAME
  const password = process.env.PASSWORD
  const host = process.env.HOST || "localhost:5672"
  try {
    let connectionString = `amqp://${host}`
    if (username && password) {
      connectionString = `amqp://${username}:${password}@${host}`
    }
    const connection = await amqp.connect(connectionString)
    console.log(`[RabbitMQ] Connected to ${host}`)
  } catch (error) {
    console.error(`[RabbitMQ] Error occured from RabbitMQ server: ${host}`)
    console.error(error.stack)
  }
})()

建立接收者

接著建立接收者,首先,建立第一個通道(channel1)及隊列(queue1)

const queue = "queue1"
const channel1 = await connection.createChannel()
await channel1.assertQueue(queue)

建立成功後,透過 consume 函式建立連結並監聽,當接收到訊息時印出來,回傳 ack 確認接收成功

channel1.consume(queue, message => {
  if (message !== null) {
    console.log(`Recieved message:`, message.content.toString())
    channel1.ack(message)
  }
})

建立傳送者

接著建立傳送者,一樣先建立另一個通道(channel2),設定定時器及透過 sendToQueue 函式,每隔一秒傳送"Hello world!"到隊列(queue1)

const channel2 = await connection.createChannel()
setInterval(() => {
  const sendMessage = "Hello world!"
  channel2.sendToQueue(queue, Buffer.from(sendMessage))
}, 1000)

完成並執行

經過整理後,完整程式碼會長這樣:

const amqp = require("amqplib")
;(async () => {
  const username = process.env.USER_NAME
  const password = process.env.PASSWORD
  const host = process.env.HOST || "localhost:5672"
  try {
    let connectionString = `amqp://${host}`
    if (username && password) {
      connectionString = `amqp://${username}:${password}@${host}`
    }
    const connection = await amqp.connect(connectionString)
    console.log(`[RabbitMQ] Connected to ${host}`)
    const queue = "queue1"
    const channel1 = await connection.createChannel()
    await channel1.assertQueue(queue)
    channel1.consume(queue, message => {
      if (message !== null) {
        console.log(`Recieved message:`, message.content.toString())
        channel1.ack(message)
      }
    })
    const channel2 = await connection.createChannel()
    setInterval(() => {
      const sendMessage = "Hello world!"
      channel2.sendToQueue(queue, Buffer.from(sendMessage))
    }, 1000)
  } catch (error) {
    console.error(`[RabbitMQ] Error occured from RabbitMQ server: ${host}`)
    console.error(error.stack)
  }
})()

執行程式,結果如下:

[RabbitMQ] Connected to localhost:5672
Received message: Hello world!
Received message: Hello world!
Received message: Hello world!
Received message: Hello world!

結論

今天透過一個簡單的小程式,實作了 RabbitMQ 的連線,然而,在實作上會將傳送者與接收者分開,一個服務可能就是一個傳送者/接收者,服務與服務之間會透過與 RabbitMQ 的互動來獲取資料。